package com.migrate.module.service;

import com.migrate.module.domain.ScrollDomain;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * canal mq消息消费任务
 *
 * @author zhonghuashishan
 */
@Component
public class CanalConsumeTask implements ApplicationRunner
{
    /**
     * rocketmq的nameServer地址
     */
    @Value("${rocketmq.name-server:127.0.0.1:9876}")
    private String nameServerUrl;

    // 这个东西里面，其实是可以需要去拿到我们增量同步配置
    // 你要从rocketmq里监听到binlog变更，你要关注的是哪些库哪些表的binlog变更
    @Autowired
    private MigrateConfigService migrateConfigService;

    @Override
    public void run(ApplicationArguments args) throws Exception{
        // 当前配置的全部迁移系统
        List<ScrollDomain> scrollDomainList = migrateConfigService.queryScrollDomainList();

        // 直接会在这里创建一个线程池，线程数量，是跟我们要同步的数据来源的配置的数量，是一致的
        ExecutorService executors = Executors.newFixedThreadPool(scrollDomainList.size());

        for (ScrollDomain scrollDomain : scrollDomainList){
            if (scrollDomain.getDataSourceType().equals(1)){
                // 我们的话在从rocketmq里拉取binlog的时候，避免让consumer自动去提交offset
                // 精准的去控制offset提交，我们要确保每一条binlog都已经被应用到你的目标数据源里去了
                // 此时对这条offset才能去做一个提交，rocketmq模块，consumer提交offset都是自动提交，先提交到本次缓存里，再提交到rocketmq里去
                // 导致offset提交不是太精准

                // 执行拉取任务
                executors.execute(new CanalPullRunner(scrollDomain.getDomainTopic(), nameServerUrl));
                // 执行提交任务
                executors.execute(new CanalPullCommitRunner(scrollDomain.getDomainTopic(), nameServerUrl));
            }
        }
    }
}
